Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.

Conveyor Thread Wrapper for Modern C++

4.99/5 (25 votes)
21 Mar 2017, last revision: 29 Oct 2017CPOL13 min read 27.9K   542  
New thread wrapper (v. 2.0) offers programming model based on blocking queue and delegates supplied by other threads

Epigraph:

In the queue, you sons of bitches, in line!
M. A. Bulgakov, Heart of a Dog

Contents

Introduction

This is the second article from the short series devoted to thread wrappers:

  1. Thread Wrapper for Modern C++.
  2. Present article.

This new article combines several things.

First of all, new conveyer wrapper is based on the previous article where most principles of threading and thread synchronization are described in detail. I’ll have to point to this article in several places.

Next important component is the design and implementation of delegates taken from my article The Impossibly Fast C++ Delegates, Fixed. These delegates are faster than std::function and offer considerable convenience.

And, finally, this is all combined with the conception of blocking queue or blocking collection. One well-known example of such is the .NET FCL template class BlockingCollection. In my article Simple Blocking Queue for Thread Communication and Inter-thread Invocation, I offered my implementation of the blocking queue (this work originated from .NET v.3.5, where BlockingCollection wasn’t yet introduced) and demonstrated various interesting use cases, including the one relevant to the present topic.

Such queues are a part of some very widely used programming models, such as message queues. It is very typical that one thread supplies data, pushing them into a queue, which is usually not a blocking call; and another thread is receiving them from a queue, which is usually a blocking call used to put a thread in the wait state when a queue is empty. Multiple threads can participate in this transport on both ends.

The topic of this article is the specialized case of the queue, explained in my article on the blocking queues. Do the queue elements have to be just data? No. One important approach is to queue… the delegate instances. More exactly, a queue element could be a delegate instance packed with the values of the parameters needed for the delegate calls.

This is the way to defer execution of several tasks and serialize the execution. There is nothing wrong which consecutive execution of some tasks, just the opposite, using to many or unpredictable number of threads is a pretty usual mistake. Still, parallelism is used; and the set of the queued tasks is executed in parallel with some other threads, such as UI.

This approach covers a big and practically important class of applications. It’s very typical that we need to queue one or more tasks for background execution while doing other activities, such as UI operation, networking, communications, etc., in other threads. It’s also the best to reuse the same thread during the whole application lifetime, instead of creating a thread again and again for every execution of a background task. It could be considered as a more stable and predictable alternative to thread pools.

Thread Wrapper Design

The method Body of the class ConveyorThreadWrapper is sealed by using the final specifier. The control over the code to be executed in a thread is given to the user in a different form: the user can supply different delegates and push them onto the queue. I will refer the actions performed by such delegate instances to as tasks. The class is a template class; its only template parameter is the type of the delegate parameter. To queue a task, the use code supplies a delegate instance and a value of a parameter for the its call.

This wrapper class can be used immediately, without a need to create derived classes. Apparently, the delegates can be based on the member functions of such derived class, but it’s more natural to make them based on some user class. See also the section Delegate and Delegate Parameter Types.

The only thing to override would be the virtual function OnCurrentTaskAborted, but even this is not needed; instead, since v. 2.0, another delegate CurrentTaskAborted can be supplied by the user code for the same purpose.

The user code can request abort on two different levels: shallow and deep. Deep abort is propagated to the to stack frame of the thread, so the whole thread is terminated, but shallow abort is propagated only to the end of the currently executing task; after this abort, the thread takes another task from the queue, which leads either to the wait state (if the queue is empty) or immediate execution of the next task.

Demonstration

To see how it all works, let’s look at the demonstration. Presently, there are two demonstrations shown one after another; first one is the modified presentation for the first article. Let’s look at the second one.

There are two different delegate instances implementing two different algorithms. The user is supposed to enter just one character at a time as a command, a character followed by [Enter]. Depending on the character, the thread can be put to sleep, waken up or aborted. The user can choose the either the deep abort (quit) or shallow abort, which aborts the presently executing task. For other characters, the code point of the entered character is passed to the delegate as its parameter of the type used as a template parameter type. To handle the shallow abortion, the class derived from ConveyorThreadWrapper is written:

C++
class IntegerConveyor : public ConveyorThreadWrapper<int> {
protected:
    virtual void OnCurrentTaskAborted(const int& current) override {
        std::cout << "Current task aborted: " << current;
        ConveyorThreadWrapper<int>::OnCurrentTaskAborted(current);
    } //OnCurrentTaskAborted
}; //class IntegerConveyor

Note the call to the method of the base class OnCurrentTaskAborted. Normally, this is not intended. The only reason I do it is to demonstrate the operation of the delegate CurrentTaskAborted, which could not be called without the base class’s method:

C++
cv.CurrentTaskAborted = [](int) -> void {
    std::cout << ". Enter any character: ";
}; //cv.CurrentTaskAborted

See also the complete demo.

This wrapper and its embedded thread are driven from a main thread from a console:

C++
class ConveyorThreadWrapperDemo : public ConveyorThreadWrapper<int> {
    enum class command {
        quit = 'q', // entire thread
        abort = 'a', // current task
        sleep = 's',
        wakeUp = 'w',
    };
    static const char* help() { return "a: task abort, q: quit, s: sleep, w: wake up, key: "; }
    static bool commandIs(char c, command cmd) { return (int)cmd == (int)c; }
public:
    using natural = unsigned long long int;
    static void Run(natural delayMs) {
        natural max = 100;
        IntegerConveyor cv;
        cv.CurrentTaskAborted = [](int) -> void {
            std::cout << ". Enter any character: ";
        }; //cv.CurrentTaskAborted
        using cType = decltype(cv);
        auto sleep = [delayMs] {
            std::this_thread::sleep_for(std::chrono::milliseconds(delayMs));
        }; //sleep
        auto Hailstone = [] (natural a) -> natural {
            if (a % 2 == 0)
                return a / 2;
            else
                return 3 * a + 1;
        }; //Hailstone (iteration)
        auto HailstoneStoppingTime = [Hailstone] (natural n) -> natural {
            natural stoppingTime = 0;
            while (n != 1) {
                n = Hailstone(n);
                ++stoppingTime;
            } //loop
            return stoppingTime;
        }; //HailstoneStoppingTime
        auto lambdaHailstone = [HailstoneStoppingTime, sleep, max] (cType::SyncDelegate& sync, int value) {
            for (natural count = 1; count < max; ++count) {
                natural stoppingTime = HailstoneStoppingTime(count + value);
                sync(false);
                std::cout
                    << count << ": " << help()
                    << value << " => Hailstone " << stoppingTime << std::endl;
                sleep();
            } //loop
        }; //lambdaHailstone
        natural mod = 1;
        mod = mod << 32; // NOT 1 < 32 !!!
        natural multiplier = 1664525; // Numerical Recipes
        natural increment = 1013904223; // Numerical Recipes
        auto Lgc = [mod, multiplier, increment](natural n) -> natural {
            return (n * multiplier + increment) % mod;
        }; //Lgc
        auto lambdaLgc = [Lgc, sleep, max](cType::SyncDelegate& sync, int value) {
            natural n = value;
            for (natural count = 0; count < max; count++) {
                n = Lgc(n);
                sync(false);
                std::cout
                    << help()
                    << value << " => LGC " << (n) << std::endl;
                sleep();
            } //loop
        }; //lambdaLgc
        SA::delegate<void(cType::SyncDelegate&, int)> delHailstone = lambdaHailstone;
        SA::delegate<void(cType::SyncDelegate&, int)> delLgc = lambdaLgc;
        cv.Start();
        char cmd;
        while (true) {
            std::cin >> cmd;
            if (commandIs(cmd, command::abort))
                cv.Abort();
            else if (commandIs(cmd, command::quit)) {
                cv.Abort(cType::AbortDepth::entierThread);
                break;
            } else if (commandIs(cmd, command::sleep))
                cv.PutToSleep();
            else if (commandIs(cmd, command::wakeUp))
                cv.WakeUp();
            else {
                int cmdValue = (int)cmd;
                if (cmdValue % 2 == 0)
                    cv.Enqueue(delHailstone, cmdValue);
                else
                    cv.Enqueue(delLgc, cmdValue);
            } //if
        } //loop
        cv.Join();
    } //Run
}; //class ConveyorThreadWrapperDemo

This demo shows the use of lambda expressions which are extremely convenient for the case. Alternatively, the mechanism of delegates also allows for the use of static functions and constant or non-constant instance functions, as it is explained in the article on delegates.

By the way, note a pretty obvious but important lambda expression technique: passing some objects through closure and not through the parameter list. For example, the lambda expression sleep uses the value delayMs from the parameter of the method Run as auto sleep = [delayMs].

For this application, this is essential. It could be passed in a parameter list, but let’s see how sleep is used. Ultimately, all those lambda expressions are called by the lambda expressions lambdaHailstone and lambdaLgc and also are passed through their closures. These two ultimate lambda expressions are never called in this context. Instead, they are assigned to the delegate instances. Their parameter lists should fit the profile of the delegate (shown here) and cannot accept other actual parameters.

Pay attention that multiple characters can be entered sequentially while some task is being executed. It demonstrates the cases when new tasks are queued while one of the tasks is being executed. If, say, N tasks were queued during execution of another tasks, it will take up to N+1 current task aborts to bring the thread to a wait state. When the presently executing task is aborted, the next task is taken for execution from the queue.

If shallow abort is not performed for a while, the current task eventual completes execution. It puts the thread to sleep. The thread can be woken up by adding another task to the queue. I intentionally specified small number of iterations, to demonstrate this condition. There is no a need to add another virtual function or a delegate to hook up this event by the user code, because the appropriate action, if needed, can be simple written as a part of the task delegate code.

In this code sample, I tried to shorten the text at the expense of purely mathematical algorithms and leave the “system” aspects. At the same time, the fun part is about those algorithms.

Demonstrated Algorithms

Let’s make a short fun break not really related to the topic of this article. In the demo application, to demonstrate passing different delegates to the queue, I’ve implemented the choice between the two (the chosen algorithm depends on the parity of the code point of the user-entered character): calculation of the total stopping time of a sequence of Hailstone numbers and linear congruential generator.

First thing is a very intricate mathematical problem, most likely, not solved at the moment of writing; the problem is considered as an “extraordinarily difficult problem, out of reach of the present day mathematics”. The problem is called Collatz conjecture, or 3n+1 conjecture. It says that repeated sequence of numbers defined by the recursive process n->n/2 for even n and n->3n+1 for odd n will eventually reach the value of 1 for any initial n. So far, we don’t know numbers which don’t behave this way, but it’s not proven that such numbers don’t exit. In my demo, I calculate the Hailstone number sequences themselves, but display only the numbers of iterations required to achieve 1, which is called total stopping time for a given initial number n. For initial n, I take the sum of the code point of the user-entered character and the number of iteration of an outer cycle. This total stopping time value and its dependency on the initial number is one of the real mathematical wonders — do read the article; it is very interesting.

The second algorithm is a very simple algorithm for generation of a deterministic sequence of pseudo-randomized numbers: linear congruential generator. The algorithm requires three carefully chosen numeric parameters, which I take according to Numerical Recipes.

The implementations of these algorithms are way too trivial to show them here; everyone is welcome to look at them at in the downloadable source code.

Giving the Wrapper a Task

This is how a task can be queued:

C++
template<typename PARAM>
class ConveyorThreadWrapper : public ThreadWrapper {
public:
    
    // ...

    void Enqueue(TaskDelegate& task, PARAM argument) {
        std::lock_guard<std::mutex> lock(queueMutex);
        queue.push(new QueueItem(task, argument));
        queueCondition.notify_one();
    } //Enqueue

    // ...

}; //class ConveyorThreadWrapper

The thread is notified on the new task. This is important if the queue is currently empty, so the thread is in the wait state and needs to be woken up. Let’s look at the queue synchronization.

Queue Synchronization

The elements stored in the queue are of the type QueueItem holding both pointers to the delegate instances and the values of the parameters needed for the call. The synchronization primitives queueMutex and queueCondition are used to synchronize operation of the function Enqueue shown above and the thread running the task:

C++
template<typename PARAM>
class ConveyorThreadWrapper : public ThreadWrapper {
// ...
private:

    // ...

    struct QueueItem {
        QueueItem() = default;
        QueueItem(TaskDelegate& del, PARAM param) {
            delegate = &del;
            parameter = param;
        } //QueueItem
        TaskDelegate* delegate;
        PARAM parameter;
    }; //struct QueueItem

    std::queue<QueueItem*> queue; // protected by queueState and queueMutex
    std::mutex queueMutex;
    std::condition_variable queueCondition;

    void getTask(QueueItem& itemCopy) {
        std::unique_lock<std::mutex> ul(queueMutex);
        queueCondition.wait(ul, [=] {
            return queue.size() > 0;
        });
        auto front = queue.front();
        itemCopy.delegate = front->delegate;
        itemCopy.parameter = front->parameter;
        queue.pop();
        delete front;
    } //getTask
}; //class ConveyorThreadWrapper

In addition to that, we need to keep the task responsive to task throttling (described in detail in the previous article) and abortion.

Thread State Synchronization

In the class ThreadWrapper, the function SyncPoint was immediately accessible to the thread code as a protected member function. In the delegate approach, this function is not accessible. Instead, a delegate instance based on this function is passed to each task delegate call. The class ThreadWrapper provides a static function which can be used by derived classes to create such delegate instance.

C++
class ThreadWrapper {
// ...
protected:

    // ...

    static void InitializeSyncPointDelegate(SA::delegate<void(bool)>& del, ThreadWrapper* instance) {
        del = SA::delegate<void(bool)>::create<ThreadWrapper, &ThreadWrapper::SyncPoint>(instance);
    } //InitializeSyncPointDelegate

    // ...

}; //class ThreadWrapper

To see how the task delegates are called, we need to look at the overridden function Body. Also, we can see how shallow abort is handled.

Handling Shallow Abort

Let’s take a look at the overridden method Body:

C++
class ThreadWrapper {
// ...
protected:

    void Body() override final {
        while (true) {
            QueueItem task;
            getTask(task);
            try {
                (*task.delegate)(syncDelegate, task.parameter);
            } catch (ShallowThreadAbortException&) {
                SetAbort(false);
                OnCurrentTaskAborted(task.parameter);
            } //exception
        } //loop
    } //Body

    // ...

}; //class ThreadWrapper

This code samples shows how the task delegates are called. Also, let’s look at the ShallowAbortException handling. First of all, note that the abort condition is cleared. It is needed for the next task to execute immediately, as soon as a new element is queued. The abort condition is set/cleared in the base class ThreadWrapper, with synchronization. This is shown in the previous article.

Remember, all other exceptions, including the regulardeepThreadAbortException are handled in the outer context of Body. This is explained in detail in the previous article.

This is how the user can choose between shallow and deep abort:

C++
class ThreadWrapper {
public:

    // ...

    enum class AbortDepth { currentTask, entierThread, };
    void Abort(AbortDepth depth = AbortDepth::currentTask) {
        SetAbort(true, depth == AbortDepth::currentTask);
    } //Abort

    // ...

}; //class ThreadWrapper

Delegate and Delegate Parameter Types

This is how the involved delegate types are defined:

C++
using SyncDelegate = SA::delegate<void(bool)>;
using TaskDelegate = SA::delegate<void(SyncDelegate&, PARAM)>;

The delegate type TaskDelegate is used to be stored in the queue, and the TaskDelegate reflects the profile of SyncPoint. The delegate instance based on SyncPoint is passed as a first parameter to the task function to give the application developer the opportunity of keeping the task responsive to thread throttling and aborts. Note that the implementation of delegates doesn’t have to be implemented in a class derived from the thread wrapper class but can be implemented in a user’s class, where it has no access to the SyncPoint function itself.

As it is explained in my article on delegates, a delegate instance can be constructed out of a static function, instance function, constant instance function or a lambda expression. The closure capture is also supported, which is a very delicate issue (for example, passing an instance of a lambda expression breaks the capture).

The user of the class is responsible for the memory management for delegate instances. First of all, the delegates should be called in the scope of the lambda expression used. The C++ standard explicitly specifies that captured context is not preserved out of the scope of the lambda expression declaration.

The PARAM value is passed by value and stored in the wrapper queue. It seems to be reasonable. The whole idea of the queue is to keep all what’s needed for the delegate calls. At the same time, not all parameter types are suitable for a ConveyorThreadWrapper template parameter. It’s not so easy to list all requirements, but the potential problems can be easily identified during application development. Apparently, the default constructor should not be deleted, the types should support copy by value operators, and so on. In particular, if the parameter structure has direct or indirect pointers, it should be the responsibility of the application developer to make sure that those pointers point to valid memory objects during the delegate call. Another issue with those pointers is this: if they can be used in the thread of the wrapper, they and the pointed object should be the subject of thread synchronization. The application developer should take care of all such issues. One encapsulated synchronization mechanism suitable for many most typical cases is offered on the previous article.

Compatibility and Build

All the thread wrappers solution is contained in just two files solution is contained in just two files:

  • “ThreadWrapper.h”,
  • “InterlockedProperty.h”,
  • “DelegateBase.h”,
  • “Delegate.h”,
  • “ConveyorThreadWrapper.h”,

they can be added to any project.

The compiler should support C++11 or later standard. For GCC, this is an option which should be set to -std=c++11 or, say, -std=c++14.

The demo project is provided in two forms: 1) Visual Studio 2015 solution and project using Microsoft C++ compiler and Clang — see “ThreadWrapper.sln” and 2) Code::Blocks project using GCC — “ ThreadWrapper.cbp”. For all other options, one can assemble a project or a make file by adding all “*.h” and “*.cpp” files in the code directory “Cpp”, a subdirectory of the directory of the solution file.

I tested the code with Visual Studio 2015, Clang 4.0.0, GCC 5.1.0.

The C++ options included “disable language extensions” (/Za for Microsoft and Clang), which seems to be essential for Microsoft.

Versions

1.0

Initial version, March 21, 2017

2.0

March 24, 2017

The class ConveyorThreadWrapper has been simplified through better reuse of the ThreadWrapper code, which also has been updated. Thread state synchronization was separated from queue synchronization, which is potentially better for the throughput. The demo code demonstrating both classes was re-designed. The previous article Conveyor Thread Wrapper for Modern C++ is updated.

2.1

October 29, 2017

Fixed a bug in ThreadWrapper::ExceptionCaught function signature. Must be: virtual void ExceptionCaught(std::exception& exception) {}.

Conclusions

I really think that two thread wrapper types, ThreadWrapper and ConveyorThreadWrapper cover the programming models most important for most typical practical cases. Also, the can help to simplify and streamline thread programming in general. First of these two wrapper type, as less specialized, imposes no limitations on leveraging the full power of general C++ threading.

I will greatly appreciate all informative criticism, issue reports, notes and suggestions for improvements.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)